Skip to content

fix(engine): per-engine threads to eliminate cross-engine stream contamination#1304

Merged
jundot merged 1 commit into
jundot:mainfrom
ivaniguarans:feat/per-engine-threads
May 27, 2026
Merged

fix(engine): per-engine threads to eliminate cross-engine stream contamination#1304
jundot merged 1 commit into
jundot:mainfrom
ivaniguarans:feat/per-engine-threads

Conversation

@ivaniguarans

Copy link
Copy Markdown
Contributor

Summary

When multiple LM engines run concurrently, they share a single _global_mlx_executor with max_workers=1, serializing all scheduler.step() calls through one thread. More critically, the MTP patch reads the module-level generation_stream via sys.modules for its forward passes, bypassing whatever stream the BatchGenerator was instantiated with. If two MTP-capable engines run simultaneously, their MTP forwards land on the same module-level stream regardless of which engine dispatched them — a stream-ordering violation that upstream's BatchGenerator(stream=...) parameter (mlx-lm 0.31.3) was designed to prevent.

This PR gives each EngineCore its own ThreadPoolExecutor and mx.Stream, passes the stream through Scheduler into BatchGenerator, and removes the _get_generation_stream() indirection from the MTP patch so MTP operations inherit the correct per-engine stream from the enclosing BatchGenerator context — the same pattern upstream's GenerationBatch._step() already uses.

The global executor is retained for non-LM engines (TTS, STT, embedding, reranker) that still rely on get_mlx_executor() and _init_mlx_thread.

Changes

  • engine_core.py: EngineCore.__init__ creates a per-engine ThreadPoolExecutor + mx.new_thread_local_stream() and passes the stream to Scheduler. close() shuts down the per-engine executor after scheduler cleanup. Added _ensure_wired_limit() so the process-global mx.set_wired_limit() runs once rather than racing across concurrent BatchGenerator inits.
  • scheduler.py: Scheduler.__init__ accepts an optional stream parameter (falls back to the module-level generation_stream when not provided). All 37 internal references to generation_stream — sync barriers, cache clears, mx.stream() context managers, BatchGenerator creation — now use self._stream.
  • batch_generator.py (MTP patch): Removed _get_generation_stream() and the 4 explicit with mx.stream(...) wrappers that pushed the module-level stream. MTP forwards now inherit the per-engine stream from the enclosing BatchGenerator context, matching GenerationBatch._step()'s existing pattern.

Concurrent throughput

Two models generating simultaneously vs sequentially:

Model pair Before (shared executor) After (per-engine) Speedup
Qwen3-0.6B + Qwen3-Coder-Next-6bit 1.00x 1.14x 0.6B TTFT: 2089 ms → 701 ms
Qwen3.6-35B-A3B-oQ4-mtp + Qwen3.6-27B-oQ6-mtp 0.93x 1.12x wall: 5408 ms → 4722 ms

Sub-2x is expected — Metal command buffers still serialize on one GPU. The win is CPU-side overlap (prefill + decode can be submitted concurrently) and eliminating head-of-line blocking where one engine's long prefill stalls another's token emission.

Test plan

  • New tests/test_per_engine_threads.py (10 tests): verifies Scheduler stores and uses explicit streams, regex-scans the Scheduler class body for bare generation_stream references, confirms each EngineCore gets a distinct executor/stream, validates executor shutdown on close(), and asserts the MTP patch no longer contains _get_generation_stream or any generation_stream reference.
  • Updated tests/test_engine_core.py: existing executor tests now assert is not (distinct executors) and concurrent execution (both executors active simultaneously).
  • Full suite passes: 4493 passed, 19 skipped.
  • Live-tested with concurrent Qwen3.6-35B-A3B-oQ4-mtp + Qwen3.6-27B-oQ6-mtp (both MTP-enabled) serving requests simultaneously.

Related to #1248

@ivaniguarans ivaniguarans force-pushed the feat/per-engine-threads branch from 0760768 to 0835164 Compare May 21, 2026 13:27
@jundot

jundot commented May 21, 2026

Copy link
Copy Markdown
Owner

Thanks for putting this together, the analysis is spot on. The per-engine stream isolation is clearly the right fix for the cross-engine contamination, and the writeup with the concurrent throughput numbers and the regex-based tests is really thoughtful work.

My only hesitation is the scope. This touches the scheduler core and the MTP patch with 37 stream references rewritten, and I want to give it more soak time than the 0.3.9 release window allows. So I am going to hold this out of the 0.3.9 stable release and pull it in starting from the next dev build instead.

I know you have been waiting on this, sorry for the delay and thanks for your patience. The change itself looks solid, I just want to land it where it can get proper testing before it hits a stable release.

…amination

Replace the shared _global_mlx_executor with per-EngineCore
ThreadPoolExecutor + mx.Stream, and fix the MTP patch reading the
module-level generation_stream instead of the per-engine stream.
@jundot jundot force-pushed the feat/per-engine-threads branch from 0835164 to 587d77c Compare May 27, 2026 05:04
@jundot

jundot commented May 27, 2026

Copy link
Copy Markdown
Owner

Rebased locally to absorb two small fixups: dropped the 5th _get_generation_stream() call inside _reconcile_mtp_to_standard (added in 6f927ec after this PR's base) and routed the new _safe_sync_generation_stream() in _async_store_cache_worker (#1437) through the same _safe_sync_stream(self._stream) helper.

The MTP-via-enclosing-context claim checks out — BatchGenerator.next wraps _next in mx.stream(self._stream) in mlx-lm 0.31.3. Merging now into the next dev build.

Thanks again for the careful writeup and the perf numbers.

@jundot jundot merged commit 56860b3 into jundot:main May 27, 2026
jundot added a commit that referenced this pull request May 27, 2026
BatchGenerator.__init__ already calls mx.set_wired_limit() on each
instance, and concurrent calls with the same value are race-free
(verified empirically). The guard never prevented the race it claimed
to fix.

Follow-up to #1304.
jundot added a commit that referenced this pull request May 27, 2026
mlx-vlm's load() only materializes model.language_model.parameters(), leaving frozen buffers (RoPE freqs) and sibling sub-trees (vision_tower, audio_tower) as lazy arrays bound to the loader thread's default stream. Pre-#1304 this was invisible because loader and forward shared one global thread; the per-engine executor split exposed it as "no Stream(gpu, X) in current thread" when mx.eval touches a sibling buffer during prefill.

Fix: materialize the full model tree on the loader thread right after load. Verified against gemma-4-E2B-it and gemma-4-31b-it-4bit.

Also fixes test_safe_sync_passes_generation_stream to match the _default_generation_stream alias introduced by #1304.

Reported by @zviratko in #1304.
jundot pushed a commit that referenced this pull request May 27, 2026
)

#1304 (``fix(engine): per-engine threads to eliminate cross-engine
stream contamination``) refactored the patched ``BatchGenerator`` to
inherit its execution stream from the enclosing engine context, and
removed the module-level ``_get_generation_stream`` helper as part of
that. ``TestBatchGeneratorDispatch._make_reconcile_batch`` still tried
to monkeypatch that name and failed at collection of every test that
depends on the fixture with ``AttributeError: module ... has no
attribute '_get_generation_stream'`` — taking down 4 reconcile-path
tests on every CI run.

The override is no longer needed: the surrounding fixture replaces
``_rebuild_singleton_cache`` and ``_call_backbone`` with fakes that
do all of their work via ``np.array`` / ``mx.array`` directly, so
neither MLX dispatch nor stream selection is reached.

Tests (tests/test_mlx_lm_mtp_patch.py::TestBatchGeneratorDispatch):
- test_reconcile_uses_queue_front_as_next_token
- test_reconcile_empty_queue_samples_from_logits
- test_reconcile_returns_false_on_empty_tokens
- test_reconcile_fallback_on_rebuild_failure

11/11 TestBatchGeneratorDispatch tests pass.
panwudi added a commit to panwudi/flyto-mlx that referenced this pull request May 27, 2026
Catch-up sync. Highlights:
- boundary-store cleanup race fix (jundot#1423) — eliminates the
  test_cleanup_all_drains_queue flake we have been carrying.
- per-engine MLX threads / streams (jundot#1304) — multiple models stepping
  scheduler.step() concurrently no longer cross-contaminate streams.
- VLM lazy state materialized on loader thread; skip MTPModule attach
  when checkpoint lacks mtp.* weights.
- Dead TieredCacheManager removed; profiles three-scope template
  refactor (jundot#1399).

4567 pass / 3 known env-override fails / 36 skip. Zero regression.
User delegated review.

---

Catch-up 同步. 主要内容:
- boundary-store 清理 race 修复 (jundot#1423) - 消掉一直拖着的
  test_cleanup_all_drains_queue flake.
- 每引擎 MLX 线程 / 流 (jundot#1304) - 多模型并发 scheduler.step()
  不再 cross-contaminate.
- VLM 在 loader 线程实例化 lazy 状态; checkpoint 无 mtp.* 权重时
  跳过 MTPModule attach.
- 删 dead TieredCacheManager; profiles three-scope template 重构
  (jundot#1399).

4567 pass / 3 known env-override fails / 36 skip. 零回归.
用户委托 review.
@ivaniguarans ivaniguarans deleted the feat/per-engine-threads branch May 27, 2026 14:16
cfbraun added a commit to cfbraun/omlx that referenced this pull request May 28, 2026
Same root cause and fix as e93c408 for the VLM MTP drafter, applied
to the SpecPrefill draft load path in both BatchedEngine and
VLMBatchedEngine.

`mlx_lm.load(specprefill_draft)` materializes only model.parameters()
via mx.eval and leaves frozen buffers (RoPE freqs, masked_embedding
tables, etc.) lazy, bound to the global mlx loader thread's stream.
The draft model is then stored on `Scheduler._specprefill_draft_model`
and later read from `score_tokens(self._specprefill_draft_model, ...)`
at scheduler.py:4247, which runs on the per-engine executor's worker
thread. The first SpecPrefill-eligible prefill raises
``no Stream(gpu, X) in current thread`` because `mx.async_eval` on the
inference thread tries to materialize lazy ops against a stream that
does not exist there.

Call materialize_lazy_state(draft_model) inside _load_draft (which
runs on get_mlx_executor()) right before returning, so every leaf
array is concrete before any inference thread reads it. The VLM
helper also refactors the dual-return into a single materialize+return
to cover both the custom_quantization and standard mlx_lm.load paths.

Closes the SpecPrefill instance of the jundot#1304 per-engine-threads bug
class identified after 9d5bed8 (main VLM model), e93c408 (VLM MTP
drafter), and 9407468 (boundary snapshots).

Not directly testable without running SpecPrefill against a per-engine
configuration, but matches the merged e93c408 pattern exactly.
cfbraun added a commit to cfbraun/omlx that referenced this pull request May 28, 2026
Same root cause and fix as e93c408 for the VLM MTP drafter, applied
to the SpecPrefill draft load path in both BatchedEngine and
VLMBatchedEngine.

`mlx_lm.load(specprefill_draft)` materializes only model.parameters()
via mx.eval and leaves frozen buffers (RoPE freqs, masked_embedding
tables, etc.) lazy, bound to the global mlx loader thread's stream.
The draft model is then stored on `Scheduler._specprefill_draft_model`
and later read from `score_tokens(self._specprefill_draft_model, ...)`
at scheduler.py:4247, which runs on the per-engine executor's worker
thread. The first SpecPrefill-eligible prefill raises
``no Stream(gpu, X) in current thread`` because `mx.async_eval` on the
inference thread tries to materialize lazy ops against a stream that
does not exist there.

Call materialize_lazy_state(draft_model) inside _load_draft (which
runs on get_mlx_executor()) right before returning, so every leaf
array is concrete before any inference thread reads it. The VLM
helper also refactors the dual-return into a single materialize+return
to cover both the custom_quantization and standard mlx_lm.load paths.

Closes the SpecPrefill instance of the jundot#1304 per-engine-threads bug
class identified after 9d5bed8 (main VLM model), e93c408 (VLM MTP
drafter), and 9407468 (boundary snapshots).

Not directly testable without running SpecPrefill against a per-engine
configuration, but matches the merged e93c408 pattern exactly.
cfbraun added a commit to cfbraun/omlx that referenced this pull request May 29, 2026
Same root cause and fix as e93c408 for the VLM MTP drafter, applied
to the SpecPrefill draft load path in both BatchedEngine and
VLMBatchedEngine.

`mlx_lm.load(specprefill_draft)` materializes only model.parameters()
via mx.eval and leaves frozen buffers (RoPE freqs, masked_embedding
tables, etc.) lazy, bound to the global mlx loader thread's stream.
The draft model is then stored on `Scheduler._specprefill_draft_model`
and later read from `score_tokens(self._specprefill_draft_model, ...)`
at scheduler.py:4247, which runs on the per-engine executor's worker
thread. The first SpecPrefill-eligible prefill raises
``no Stream(gpu, X) in current thread`` because `mx.async_eval` on the
inference thread tries to materialize lazy ops against a stream that
does not exist there.

Call materialize_lazy_state(draft_model) inside _load_draft (which
runs on get_mlx_executor()) right before returning, so every leaf
array is concrete before any inference thread reads it. The VLM
helper also refactors the dual-return into a single materialize+return
to cover both the custom_quantization and standard mlx_lm.load paths.

Closes the SpecPrefill instance of the jundot#1304 per-engine-threads bug
class identified after 9d5bed8 (main VLM model), e93c408 (VLM MTP
drafter), and 9407468 (boundary snapshots).

Not directly testable without running SpecPrefill against a per-engine
configuration, but matches the merged e93c408 pattern exactly.
cfbraun added a commit to cfbraun/omlx that referenced this pull request May 29, 2026
Same root cause and fix as e93c408 for the VLM MTP drafter, applied
to the SpecPrefill draft load path in both BatchedEngine and
VLMBatchedEngine.

`mlx_lm.load(specprefill_draft)` materializes only model.parameters()
via mx.eval and leaves frozen buffers (RoPE freqs, masked_embedding
tables, etc.) lazy, bound to the global mlx loader thread's stream.
The draft model is then stored on `Scheduler._specprefill_draft_model`
and later read from `score_tokens(self._specprefill_draft_model, ...)`
at scheduler.py:4247, which runs on the per-engine executor's worker
thread. The first SpecPrefill-eligible prefill raises
``no Stream(gpu, X) in current thread`` because `mx.async_eval` on the
inference thread tries to materialize lazy ops against a stream that
does not exist there.

Call materialize_lazy_state(draft_model) inside _load_draft (which
runs on get_mlx_executor()) right before returning, so every leaf
array is concrete before any inference thread reads it. The VLM
helper also refactors the dual-return into a single materialize+return
to cover both the custom_quantization and standard mlx_lm.load paths.

Closes the SpecPrefill instance of the jundot#1304 per-engine-threads bug
class identified after 9d5bed8 (main VLM model), e93c408 (VLM MTP
drafter), and 9407468 (boundary snapshots).

Not directly testable without running SpecPrefill against a per-engine
configuration, but matches the merged e93c408 pattern exactly.
cfbraun added a commit to cfbraun/omlx that referenced this pull request May 30, 2026
Same root cause and fix as e93c408 for the VLM MTP drafter, applied
to the SpecPrefill draft load path in both BatchedEngine and
VLMBatchedEngine.

`mlx_lm.load(specprefill_draft)` materializes only model.parameters()
via mx.eval and leaves frozen buffers (RoPE freqs, masked_embedding
tables, etc.) lazy, bound to the global mlx loader thread's stream.
The draft model is then stored on `Scheduler._specprefill_draft_model`
and later read from `score_tokens(self._specprefill_draft_model, ...)`
at scheduler.py:4247, which runs on the per-engine executor's worker
thread. The first SpecPrefill-eligible prefill raises
``no Stream(gpu, X) in current thread`` because `mx.async_eval` on the
inference thread tries to materialize lazy ops against a stream that
does not exist there.

Call materialize_lazy_state(draft_model) inside _load_draft (which
runs on get_mlx_executor()) right before returning, so every leaf
array is concrete before any inference thread reads it. The VLM
helper also refactors the dual-return into a single materialize+return
to cover both the custom_quantization and standard mlx_lm.load paths.

Closes the SpecPrefill instance of the jundot#1304 per-engine-threads bug
class identified after 9d5bed8 (main VLM model), e93c408 (VLM MTP
drafter), and 9407468 (boundary snapshots).

Not directly testable without running SpecPrefill against a per-engine
configuration, but matches the merged e93c408 pattern exactly.
jundot pushed a commit that referenced this pull request May 31, 2026
…#1485)

Same root cause and fix as e93c408 for the VLM MTP drafter, applied
to the SpecPrefill draft load path in both BatchedEngine and
VLMBatchedEngine.

`mlx_lm.load(specprefill_draft)` materializes only model.parameters()
via mx.eval and leaves frozen buffers (RoPE freqs, masked_embedding
tables, etc.) lazy, bound to the global mlx loader thread's stream.
The draft model is then stored on `Scheduler._specprefill_draft_model`
and later read from `score_tokens(self._specprefill_draft_model, ...)`
at scheduler.py:4247, which runs on the per-engine executor's worker
thread. The first SpecPrefill-eligible prefill raises
``no Stream(gpu, X) in current thread`` because `mx.async_eval` on the
inference thread tries to materialize lazy ops against a stream that
does not exist there.

Call materialize_lazy_state(draft_model) inside _load_draft (which
runs on get_mlx_executor()) right before returning, so every leaf
array is concrete before any inference thread reads it. The VLM
helper also refactors the dual-return into a single materialize+return
to cover both the custom_quantization and standard mlx_lm.load paths.

Closes the SpecPrefill instance of the #1304 per-engine-threads bug
class identified after 9d5bed8 (main VLM model), e93c408 (VLM MTP
drafter), and 9407468 (boundary snapshots).

Not directly testable without running SpecPrefill against a per-engine
configuration, but matches the merged e93c408 pattern exactly.
jonpspri pushed a commit to jonpspri/omlx that referenced this pull request Jun 12, 2026
…amination (jundot#1304)

Replace the shared _global_mlx_executor with per-EngineCore
ThreadPoolExecutor + mx.Stream, and fix the MTP patch reading the
module-level generation_stream instead of the per-engine stream.
jonpspri pushed a commit to jonpspri/omlx that referenced this pull request Jun 12, 2026
BatchGenerator.__init__ already calls mx.set_wired_limit() on each
instance, and concurrent calls with the same value are race-free
(verified empirically). The guard never prevented the race it claimed
to fix.

Follow-up to jundot#1304.
jonpspri pushed a commit to jonpspri/omlx that referenced this pull request Jun 12, 2026
mlx-vlm's load() only materializes model.language_model.parameters(), leaving frozen buffers (RoPE freqs) and sibling sub-trees (vision_tower, audio_tower) as lazy arrays bound to the loader thread's default stream. Pre-jundot#1304 this was invisible because loader and forward shared one global thread; the per-engine executor split exposed it as "no Stream(gpu, X) in current thread" when mx.eval touches a sibling buffer during prefill.

Fix: materialize the full model tree on the loader thread right after load. Verified against gemma-4-E2B-it and gemma-4-31b-it-4bit.

Also fixes test_safe_sync_passes_generation_stream to match the _default_generation_stream alias introduced by jundot#1304.

Reported by @zviratko in jundot#1304.
jonpspri pushed a commit to jonpspri/omlx that referenced this pull request Jun 12, 2026
…ndot#1445)

jundot#1304 (``fix(engine): per-engine threads to eliminate cross-engine
stream contamination``) refactored the patched ``BatchGenerator`` to
inherit its execution stream from the enclosing engine context, and
removed the module-level ``_get_generation_stream`` helper as part of
that. ``TestBatchGeneratorDispatch._make_reconcile_batch`` still tried
to monkeypatch that name and failed at collection of every test that
depends on the fixture with ``AttributeError: module ... has no
attribute '_get_generation_stream'`` — taking down 4 reconcile-path
tests on every CI run.

The override is no longer needed: the surrounding fixture replaces
``_rebuild_singleton_cache`` and ``_call_backbone`` with fakes that
do all of their work via ``np.array`` / ``mx.array`` directly, so
neither MLX dispatch nor stream selection is reached.

Tests (tests/test_mlx_lm_mtp_patch.py::TestBatchGeneratorDispatch):
- test_reconcile_uses_queue_front_as_next_token
- test_reconcile_empty_queue_samples_from_logits
- test_reconcile_returns_false_on_empty_tokens
- test_reconcile_fallback_on_rebuild_failure

11/11 TestBatchGeneratorDispatch tests pass.
jonpspri pushed a commit to jonpspri/omlx that referenced this pull request Jun 12, 2026
…jundot#1485)

Same root cause and fix as e93c408 for the VLM MTP drafter, applied
to the SpecPrefill draft load path in both BatchedEngine and
VLMBatchedEngine.

`mlx_lm.load(specprefill_draft)` materializes only model.parameters()
via mx.eval and leaves frozen buffers (RoPE freqs, masked_embedding
tables, etc.) lazy, bound to the global mlx loader thread's stream.
The draft model is then stored on `Scheduler._specprefill_draft_model`
and later read from `score_tokens(self._specprefill_draft_model, ...)`
at scheduler.py:4247, which runs on the per-engine executor's worker
thread. The first SpecPrefill-eligible prefill raises
``no Stream(gpu, X) in current thread`` because `mx.async_eval` on the
inference thread tries to materialize lazy ops against a stream that
does not exist there.

Call materialize_lazy_state(draft_model) inside _load_draft (which
runs on get_mlx_executor()) right before returning, so every leaf
array is concrete before any inference thread reads it. The VLM
helper also refactors the dual-return into a single materialize+return
to cover both the custom_quantization and standard mlx_lm.load paths.

Closes the SpecPrefill instance of the jundot#1304 per-engine-threads bug
class identified after 9d5bed8 (main VLM model), e93c408 (VLM MTP
drafter), and 9407468 (boundary snapshots).

Not directly testable without running SpecPrefill against a per-engine
configuration, but matches the merged e93c408 pattern exactly.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants